package com.ybw.order.demo.service.impl;

import com.ybw.order.demo.entity.MqConsumer;
import com.ybw.order.demo.mapper.MqConsumerMapper;
import com.ybw.order.demo.service.MqConsumerService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ybw.order.demo.utils.GsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.function.Consumer;

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author ybwei
 * @since 2022-09-04
 */
@Service
@Slf4j
public class MqConsumerServiceImpl extends ServiceImpl<MqConsumerMapper, MqConsumer> implements MqConsumerService {

    @Override
    public void saveMqConsumer(MessageExt msg, Consumer<String> consumer) {
        log.info("MqConsumerServiceImpl saveMqConsumer messageExt:{}", GsonUtils.toJsonString(msg));
        //1、校验
        //1.1 校验消息已被消费
        String keys = msg.getKeys();
        MqConsumer mqConsumer = this.lambdaQuery().eq(MqConsumer::getPropertyKeys, keys).one();
        if (mqConsumer != null) {
            //1.1 消息已被消费
            log.info("消息已被消费，不再处理");
            return;
        }
        //1.2 校验参数是否为空
        String body = new String(msg.getBody());
        if (StringUtils.isBlank(body)) {
            log.info("消息内容为空");
            return;
        }
        //2、执行业务逻辑
        consumer.accept(body);
        //3、保存
        mqConsumer = new MqConsumer();
        mqConsumer.setPropertyKeys(msg.getKeys());
        mqConsumer.setMsgid(msg.getMsgId());
        mqConsumer.setMessage(new String(msg.getBody()));
        mqConsumer.setCreateTime(LocalDateTime.now());
        mqConsumer.setUpdateTime(LocalDateTime.now());
        this.save(mqConsumer);
    }
}
